-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding node_id to ExecutionPlanProperties #12186
base: main
Are you sure you want to change the base?
Conversation
…notation to combined create_physical_plan API
We had similar challenges when using DataFusion in a context similar to yours (checkpointing etc.) I have consulted with my team on how we solved them, and discussed this general approach. I can share the following general thoughts/concerns:
Therefore, we think the right approach is to traverse the tree (for plans or streams, depending on your use case), generate the IDs as you see fit, and store it downstream in a map container that associates nodes with your IDs. I don't think doing this upstream inside Apologies for not being able to provide feedback and start the detailed discussion before you prepared a PR, but in my defense you were too fast :) I think it would be great to add the traversal code and the map approach I mentioned as an example to upstream for future users who want to do something like this. This is an approach we are following for indexing work as well |
Thanks for the feedback @ozankabak . Would it make sense to add this traversal code in the utils? I would be nice for this to be available in the core library itself. |
Let's create a new example file, add the traversal logic and a simple of demonstration of how to store the node <-> id association via a map in that file. I think it will be a fairly succinct yet guiding example for many others. You may run into a difficulty creating or inserting into a map with We wanted to make |
Taking a second look at implementing this the using a global hashmap with pointers to Arc and solution is somewhat more hacky and error prone and as developers trying to build on top of DataFusion makes our experience feel pretty brittle. While we are interested in using the node_id for stateful checkpointing, it does make sense for this to be on an ExecutionPlan since it is a node on PhysicalPlan graph.
Streams can easily, derive their id "{node_id}_{stream_id}"
Presumably for any user interested in using the "node_id", they'd make their operators implement "with_node_id".
For additional types of node_ids, I suppose one could maintain them in a HashMap<usize, T> where usize is the node_id and not a ptr to Arc |
Can you share a link to your implementation attempt? This is a little surprising, I'd like to go over it and understand what is going on. |
@ozankabak would love any pointers. this code is admittedly a quick draft. so we pass in a hashmap and recursivesly traverse the tree.
then when executing the plan, we need to actually annotate this and set a global hash map singleton.
now when creating a Stream, we do need to pass a reference to the ExecutionPlan it is tied to in order for it to figure out the channel_tag/checkpoint_tag it is supposed to use to coordinate checkpoints. Btw the annotation outputted is something like -
In contrast this PR annotates the node_ids during
|
Just chiming in -- the implementation in this PR seems quite reasonable to me. While there are definitely ways to hack around the limitation of not having Node IDs, those strategies would be quite vulnerable to upstream breaking changes and given datafusion's goal of being extensible it makes makes sense for these to be a core a feature of the library. |
Thanks for the sketch, I see the challenge. After reviewing your sketch and going back and inspecting our solution in our fork, it seems to me like it may indeed make sense to add this functionality natively to upstream DF. That being said, I am worried about a few things (default value being Next week, I (or maybe @berkaysynnada) will work on a draft PR to flesh out some ideas on our end, then we can compare/contrast and arrive at a final design together. Then we can get some extra community review and merge this if noone else has other concerns or objections (whether this functionality should belong to core etc.) Thanks for awesome collaboration. |
Just wanted to leave a note to stress that I haven't forgotten about this -- this is on our shortlist to focus on after resolving some urgent issues. |
thanks for taking the time. looking forward to continuing the discussion. |
@ozankabak have you had a chance to take a look at this yet? |
It is on my agenda for this week. I have been traveling causing a delay on this project (and some others). I remain convinced some version of this functionality belongs to the core, so rest assured we will get it through the finish line sooner or later (unless someone else raises a reasonable objection that eludes me) |
Going through our implementation I see that we were able to get around the non-existing ID problem by using string IDs and an auto-generation logic for non-leaf nodes. I will probe further to get a good understanding of how we can arrive at a safe and extensible design and circle back. Probably @berkaysynnada will join the collaboration as well. Thanks for your patience. |
Hey @ozankabak have you gotten a chance to revisit this yet? |
Sorry, some other priorities kept me busy and this slipped through the cracks. @berkaysynnada let's prioritize this for next week. |
Apologies for the delay in responding. I have started working on this issue and will open a draft PR to facilitate discussion, FYI @emgeee. I plan to share it today or tomorrow. |
Hi @emgeee and @ameyc. I apologize again for the delay in providing feedback, but I believe we can iterate quickly now and align on the best design to suit every use case. Here is the approach we use, and I believe it could address your concerns while minimizing changes to the existing API's: We introduce generate_id and with_id methods to the ExecutionPlan trait: trait ExecutionPlan {
...
fn generate_id(&self) -> Result<String> {
let children = self.children();
match children.as_slice() {
[] => exec_err!("Source operator {:?} should implement the generate_id method", self.type_name()),
[child] => {
let child_id = child.generate_id()?;
Ok(format!("[{child_id}]"))
}
children => children
.iter()
.map(|child| child.generate_id())
.collect::<Result<Vec<_>>>()
.map(|ids| {
let result = ids.join(", ");
format!("[{result}]")
}),
}
}
fn with_id(self: Arc<Self>, id: String) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if self.children().is_empty() {
not_impl_err!("Source operators must implement with_id() method")
} else {
Ok(None)
}
}
...
} Source operators, such as CsvExec, implement these methods: impl ExecutionPlan for CsvExec {
...
fn generate_id(&self) -> Result<String> {
let Some(id) = self.id.as_ref() else {
return exec_err!("Source ID is not set");
};
Ok(id.clone())
}
fn with_id(self: Arc<Self>, id: String) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let mut exec = self.as_ref().clone();
exec.id = Some(id);
Ok(Some(Arc::new(exec)))
}
...
} One significant advantage of this approach is that it introduces minimal API changes, focusing only on source operators. It also provides flexible usage through customizable nesting logic for ID's. For instance, relationships between partitioned inputs or child operators can be refined to align with use case needs. A potential limitation is the use of string for ID's. However, this choice offers flexibility for cases where human-readable identifiers are preferred. Additionally, it makes the child-parent relationships self-explanatory, which is beneficial for debugging and visualization purposes. In our use case, we construct a parallel tree to enrich operator metadata, which is for snapshotting. This secondary tree is built based on these ID's and is designed to remain isolated from the original execution plan. We have utility functions that streamline this process, and we’d be happy to contribute them upstream if needed. If this approach aligns with upstream requirements, I’d be glad to collaborate on refining and integrating it, along with the necessary utilities. Let me know if this sounds like a viable direction to proceed. |
Which issue does this PR close?
Closes #11364
Rationale for this change
Currently ExecutionPlans dont have an identifier associated with them, making it hard to distinguish between the nodes for
usecases such as snapshotting continuous pipelines, displaying node metrics in a UI etc.
What changes are included in this PR?
Changes to -
ExecutionPlanProperties
to add node_idOption<usize>
ExecutionPlan
to addwith_node_id()
method to return a copy of the ExecutionPlan with assigned node id.SessionState
to add node_id annotation to finalized physical plans.physical-plan/src/node_id.rs
to traverse ExecutionPlans and generate deterministic ids for the whole tree.Are these changes tested?
Added asserts to an existing test in
datafusion-examples/src/planner_api.rs
.Are there any user-facing changes?
No